<?php
/**
 * Created by zengxianfen on 2020/7/23 15:08.
 */

namespace App\Amqp\Producer;


use App\Amqp\Builder\AmqpBuilder;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Builder;
use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Di\Annotation\AnnotationCollector;
use PhpAmqpLib\Message\AMQPMessage;

class DelayProducer extends Builder
{
    /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder $queueBuilder
     * @param bool $confirm
     * @param int $timeout
     * @return bool
     * @throws \Throwable
     */
    public function produce(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5): bool
    {
        return retry(1, function () use ($producerMessage, $queueBuilder, $confirm, $timeout) {
            return $this->produceMessage($producerMessage, $queueBuilder, $confirm, $timeout);
        });
    }

    /**
     * @param ProducerMessageInterface $producerMessage
     * @param AmqpBuilder $queueBuilder
     * @param bool $confirm
     * @param int $timeout
     * @return bool
     * @throws \Throwable
     */
    private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5): bool
    {
        $result = false;
        $this->injectMessageProperty($producerMessage);
        $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
        $pool = $this->getConnectionPool($producerMessage->getPoolName());
        /** @var \Hyperf\Amqp\Connection $connection */
        $connection = $pool->get();
        if ($confirm) {
            $channel = $connection->getConfirmChannel();
        } else {
            $channel = $connection->getChannel();
        }
        $channel->set_ack_handler(function () use (&$result) {
            $result = true;
        });
        try {
            // 处理延时队列
            $exchangeBuilder = $producerMessage->getExchangeBuilder();
            // 队列定义
            $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
            // 队列绑定
            $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
            // 消息发送
            $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
            $channel->wait_for_pending_acks_returns($timeout);
        } catch (\Throwable $exception) {
            // Reconnect the connection before release.
            $connection->reconnect();
            throw $exception;
        } finally {
            $connection->release();
        }
        return $confirm ? $result : true;
    }

    /**
     * @param ProducerMessageInterface $producerMessage
     */
    private function injectMessageProperty(ProducerMessageInterface $producerMessage): void
    {
        if (class_exists(AnnotationCollector::class)) {
            /** @var \Hyperf\Amqp\Annotation\Producer $annotation */
            $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
            if ($annotation) {
                $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
                $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
            }
        }
    }
}